Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Oct 21, 2025

What changes were proposed in this pull request?

Avoid intermediate pandas dataframe creation in df.toPandas

before: batches -> table -> intermediate pdf -> result pdf (based on pa.Table.to_pandas)

after: batches -> table -> result pdf (based on pa.ChunkedArray.to_pandas)

Why are the changes needed?

the intermediate pandas dataframe can be skipped

simple benchmark in my local


spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true")

import time
from pyspark.sql import functions as sf


df = spark.range(1000000).select(
    (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
)
cols = {f"col_{i}": sf.lit(f"c{i}") for i in range(100)}
df = df.withColumns(cols)
df.cache()
df.count()



pdf = df.toPandas() # warm up


start_arrow = time.perf_counter()
for i in range(100):
    pdf = df.toPandas()

time.perf_counter() - start_arrow

master: 304.49954012501985 secs
this PR: 285.2997682078276 secs

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

Was this patch authored or co-authored using generative AI tooling?

no

@zhengruifeng zhengruifeng changed the title [WIP][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas [SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas Oct 21, 2025
@zhengruifeng zhengruifeng requested review from HyukjinKwon and ueshin and removed request for HyukjinKwon October 23, 2025 02:15
@zhengruifeng
Copy link
Contributor Author

cc @Yicong-Huang

@zhengruifeng zhengruifeng marked this pull request as draft October 24, 2025 01:45
@zhengruifeng zhengruifeng changed the title [SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas [WIP][SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas Oct 24, 2025
@zhengruifeng zhengruifeng marked this pull request as ready for review November 4, 2025 06:32
@zhengruifeng zhengruifeng changed the title [WIP][SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas [SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas Nov 4, 2025
@zhengruifeng zhengruifeng force-pushed the avoid_unnecessary_pdf_creation branch from adc17b5 to 2ba05e2 Compare November 4, 2025 06:33
@Yicong-Huang
Copy link
Contributor

I think it is a good change!

@zhengruifeng
Copy link
Contributor Author

merged to master

@zhengruifeng zhengruifeng deleted the avoid_unnecessary_pdf_creation branch November 5, 2025 01:10
zhengruifeng pushed a commit that referenced this pull request Nov 12, 2025
… during spark connect toPandas()

### What changes were proposed in this pull request?

This PR optimizes the `to_pandas()` method in Spark Connect client to avoid creating an intermediate pandas DataFrame during Arrow to pandas conversion.

**Key changes:**
- Convert Arrow columns directly to pandas Series using `arrow_col.to_pandas()` instead of converting the entire table first with `table.to_pandas()`
- Eliminate temporary column renaming (`col_0`, `col_1`, etc.) since we no longer create an intermediate DataFrame
- Apply Spark-specific type converters directly to each Series without going through an intermediate DataFrame

### Why are the changes needed?

This optimization brings Spark Connect's `to_pandas()` implementation in line with the regular Spark DataFrame optimization made in PR #52680 ([SPARK-53967](https://issues.apache.org/jira/browse/SPARK-53967)).

**Benefits:**
1. **Reduced memory usage**: Eliminates allocation of intermediate DataFrame
2. **Better performance**: Fewer data copies, better memory locality
3. **Consistency**: Makes Spark Connect code path match the optimized regular Spark DataFrame path

### Does this PR introduce _any_ user-facing change?

No. This is a pure performance optimization with no API or behavior changes.

### How was this patch tested?
**Benchmark setup** (for manual testing):
- 1M rows × 102 columns
- Mixed types: ~25 complex columns (Date, Timestamp, Struct) + ~77 simple columns (Int, Double, String)
- Batch size: 5,000 rows per batch
- Config: Arrow enabled, self-destruct enabled

```
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
import time

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")  # Small batches: 5k rows (~1.5MB/batch)

# Large dataset: 1M rows with MIXED data types
df = spark.range(1000000).select(
    sf.col("id"),
    (sf.col("id") % 2).alias("key"),
    sf.col("id").alias("v")
)

# Add various column types to test conversion performance. These types need Spark-specific conversion:
df = df.withColumns({
    "date_col_1": sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(365)).cast("int")),
    "date_col_2": sf.date_add(sf.to_date(sf.lit("2023-01-01")), (sf.col("id") % sf.lit(180)).cast("int")),
    "timestamp_col": sf.current_timestamp(),
    "struct_col_1": sf.struct(sf.col("id").cast("long").alias("a"), (sf.col("id") * sf.lit(2)).cast("long").alias("b")),
    "struct_col_2": sf.struct((sf.col("id") % sf.lit(10)).cast("int").alias("x"), (sf.col("id") / sf.lit(100.0)).alias("y")),
    "array_col": sf.array(sf.lit(1), sf.lit(2), sf.lit(3)),
    "double_col_1": sf.col("id") / sf.lit(3.14),
    "double_col_2": sf.col("id") * sf.lit(1.5) + sf.lit(100),
    "int_col": (sf.col("id") % sf.lit(1000)).cast("int"),
})

# Add more mixed columns - some simple, some complex
for i in range(45):
    if i % 5 == 0:
        df = df.withColumn(f"mixed_{i}",
            sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(i + 1)).cast("int")))
    elif i % 5 == 1:
        df = df.withColumn(f"mixed_{i}",
            sf.struct(sf.lit(i).alias("idx"), (sf.col("id") % sf.lit(i + 1)).cast("long").alias("val")))
    elif i % 5 == 2:
        df = df.withColumn(f"mixed_{i}",
            sf.concat(sf.lit(f"str_{i}_"), (sf.col("id") % sf.lit(100)).cast("string")))
    else:
        df = df.withColumn(f"mixed_{i}", (sf.col("id") * sf.lit(i) + sf.lit(i)) % sf.lit(1000))

# Add some constant strings for variety
for i in range(45):
    df = df.withColumn(f"const_{i}", sf.lit(f"c{i}"))

df = df.drop("id")
df.cache()
df.count()

# Warm up
pdf = df.toPandas()
del pdf

# Benchmark
start = time.perf_counter()
total_rows = 0
total_sum = 0

for i in range(20):
    # Convert to pandas
    pdf = df.toPandas()
    total_rows += len(pdf)
    total_sum += pdf['v'].sum()
    del pdf

    if (i + 1) % 5 == 0:
        elapsed = time.perf_counter() - start
        print(f"  {i + 1}/20 completed ({elapsed:.1f}s elapsed, ~{elapsed/(i+1):.2f}s per iteration)")

elapsed = time.perf_counter() - start
```

**Manual benchmarking results**: 6.5% improvement with mixed data types (dates, timestamps, structs, arrays, and simple types)
   - Before: 129.3s for 20 iterations (6.46s per iteration)
   - After: 120.9s for 20 iterations (6.04s per iteration)

### Was this patch authored or co-authored using generative AI tooling?

Yes. Co-Genreated-by Cursor

Closes #52979 from Yicong-Huang/SPARK-54183/refactor/avoid-intermediate-df-in-topandas-connect.

Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
zhengruifeng pushed a commit that referenced this pull request Nov 13, 2025
…das`

### What changes were proposed in this pull request?

Following up with #52680, this PR optimizes the non-Arrow path of `toPandas()` to eliminate intermediate DataFrame creation.

**Key optimizations:**

1. **Avoid intermediate DataFrame copy**
   - `pd.DataFrame.from_records(rows)` → Direct column extraction via `zip(*rows)`
   - 2 DataFrame creations → 1 DataFrame creation

2. **Optimize column-by-column conversion** (especially for wide tables)
   - Tuples → Lists for faster Series construction
   - Implicit dtype inference → Explicit `dtype=object`
   - `pd.concat(axis="columns")` + column rename → `pd.concat(axis=1, keys=columns)`
   - Result: **43-67% speedup for 50-100 columns**

### Why are the changes needed?

**Problem:** Current flow creates DataFrame twice:
- `rows` → `pd.DataFrame.from_records()` → temporary DataFrame → `pd.concat()` → final DataFrame

The intermediate DataFrame is immediately discarded, wasting memory. This is especially inefficient for wide tables where column-by-column overhead is significant.

### Does this PR introduce _any_ user-facing change?

No. This is a pure performance optimization with no API or behavior changes.

### How was this patch tested?

- Existing unit tests.
- Benchmark

**Benchmark setup:**
- **Hardware**: Driver memory 4GB, Executor memory 4GB
- **Configuration**: `spark.sql.execution.arrow.pyspark.enabled=false` (testing non-Arrow path)
- **Iterations**: 10 iterations per test case for statistical reliability
- **Test cases**:
  - Simple (numeric columns)
  - Mixed (int, string, double, boolean)
  - Timestamp (date and timestamp types)
  - Nested (struct and array types)
  - Wide (5, 10, 50, 100 column counts)

### Performance Results

**General Benchmark** (10 iterations):

| Test Case  | Rows | OLD → NEW | Speedup |
|------------|------|-----------|---------|
| simple     | 1M   | 1.376s → 1.383s | ≈ Tied |
| mixed      | 1M   | 2.396s → 2.553s | 6% slower |
| timestamp  | 500K | 4.323s → 4.392s | ≈ Tied |
| nested     | 100K | 0.558s → 0.580s | 4% slower |
| wide (50)  | 100K | 1.458s → **1.141s** | **28% faster** 🚀 |

**Column Width Benchmark** (100K rows, 10 iterations):

| Columns | OLD → NEW | Speedup |
|---------|-----------|---------|
| 5       | 0.188s → 0.179s | 5% faster |
| 10      | 0.262s → 0.270s | ≈ Tied |
| 50      | 1.430s → **0.998s** | **43% faster** 🚀 |
| 100     | 3.320s → **1.988s** | **67% faster** 🚀 |

### Was this patch authored or co-authored using generative AI tooling?

Yes. Co-Generated-by Cursor

Closes #52897 from Yicong-Huang/SPARK-54182/refactor/avoid-intermedia-df-in-non-arrow-toPandas.

Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants